Skip to content

[Runner] Add Spark 4 runner#38212

Open
tkaymak wants to merge 14 commits intoapache:masterfrom
tkaymak:spark4-runner
Open

[Runner] Add Spark 4 runner#38212
tkaymak wants to merge 14 commits intoapache:masterfrom
tkaymak:spark4-runner

Conversation

@tkaymak
Copy link
Copy Markdown
Contributor

@tkaymak tkaymak commented Apr 16, 2026

Addresses #36841

This PR is split into smaller commits for easier review.

  1. Prepare the build system as we need to deal with Scala 2.13 and JDK 17+ is required
  2. Add Spark 4 runner
  3. Add Spark 4 job server

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request lays the groundwork for supporting Spark 4 as an execution engine for Apache Beam pipelines. It introduces a new runner built on Spark's Structured Streaming framework, initially focusing on batch processing capabilities. The changes encompass comprehensive updates to the build configuration, core pipeline component translations, and a robust metrics integration to ensure compatibility and observability within the Spark 4 ecosystem.

Highlights

  • Spark 4 Runner Introduction: Introduced an experimental Spark 4 runner for Apache Beam, leveraging Spark's Structured Streaming API for pipeline execution.
  • Build System Updates: Updated the Gradle build system to support Spark 4.0.2 with Scala 2.13, including conditional Java 17 requirement and Kafka version adjustments for Scala 2.13 compatibility.
  • Core Pipeline Component Translation: Implemented translators for fundamental Beam transforms such as Impulse, ParDo, Combine (Globally, PerKey, GroupedValues), GroupByKey, Flatten, Window.Assign, and ReadSource, focusing on batch mode execution.
  • Enhanced Metrics Integration: Integrated Beam metrics with Spark's metrics system through new MetricsAccumulator, SparkBeamMetricSource, and custom sinks (CSV, Graphite), allowing Beam metrics to be exposed via Spark's monitoring interfaces.
  • Advanced Data Handling: Developed sophisticated data handling mechanisms including a custom Spark Session factory with Kryo serialization for Beam types, optimized Spark Encoder helpers for various Beam data structures (WindowedValue, KV, Collections, Maps), and a caching SideInputReader.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Ignored Files
  • Ignored by pattern: .github/workflows/** (2)
    • .github/workflows/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.yml
    • .github/workflows/beam_PreCommit_Java_Spark4_Versions.yml
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

apply plugin: 'org.apache.beam.module'
applyJavaNature(
enableStrictDependencies: true,
requireJavaVersion: (spark_version.startsWith("4") ? org.gradle.api.JavaVersion.VERSION_17 : null),
Copy link
Copy Markdown
Contributor Author

@tkaymak tkaymak Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark 4 requires Java Version 17

Comment thread runners/spark/4/build.gradle Outdated
@tkaymak
Copy link
Copy Markdown
Contributor Author

tkaymak commented Apr 16, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for Spark 4.0.2 to the Apache Beam Spark runner, which includes a new structured streaming module, dependency updates for Java 17 and Scala 2.13, and various cross-version compatibility fixes. The code review identifies a critical bug where an incorrect function cast in the stateful runner would lead to a ClassCastException at runtime. Additionally, the feedback points out that the pipeline cancellation logic is incomplete because it does not interrupt the underlying execution future, and it identifies redundant code in the BoundedDatasetFactory that should be removed for clarity.

@tkaymak
Copy link
Copy Markdown
Contributor Author

tkaymak commented Apr 16, 2026

If the file SparkGroupAlsoByWindowViaWindowSet.java is to remain shared, the current implementation is likely the best way to handle the cross-version compatibility issue without over-engineering.

If we prefer a cleaner implementation for Spark 4 at the cost of file duplication (or creating a version adapter), I can move the file and refactor it.

@tkaymak tkaymak changed the title [WIP] Add Spark 4 runner [Runner] Add Spark 4 runner Apr 16, 2026
@tkaymak
Copy link
Copy Markdown
Contributor Author

tkaymak commented Apr 16, 2026

assign set of reviewers

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @Abacn for label build.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@tkaymak
Copy link
Copy Markdown
Contributor Author

tkaymak commented Apr 16, 2026

The only failing test is ':runners:google-cloud-dataflow-java:worker:test' which seems unrelated to this PR - I will rebase from master and see if that fixes it.

@tkaymak
Copy link
Copy Markdown
Contributor Author

tkaymak commented Apr 17, 2026

It's now green. Let me know if I should split this further @Abacn

tkaymak and others added 12 commits April 17, 2026 16:01
Add spark4_version (4.0.2) to BeamModulePlugin alongside the existing
spark3_version. Update spark_runner.gradle to conditionally select the
correct Scala library (2.13 vs 2.12), Jackson module, Kafka test
dependency, and require Java 17 when building against Spark 4.

Register the new :runners:spark:4 module in settings.gradle.kts.

These changes are purely additive — all conditionals gate on
spark_version.startsWith("4") or spark_scala_version == '2.13', leaving
the Spark 3 build path untouched.

Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
Add the Gradle build file for the Spark 4 structured streaming runner.
The module mirrors runners/spark/3/ — it inherits the shared RDD-base
source from runners/spark/src/ via copySourceBase and adds its own
Structured Streaming implementation in src/main/java.

Key differences from the Spark 3 build:
- Uses spark4_version (4.0.2) with Scala 2.13.
- Excludes DStream-based streaming tests (Spark 4 supports only
  structured streaming batch).
- Unconditionally adds --add-opens JVM flags required by Kryo on
  Java 17 (Spark 4's minimum).
- Binds Spark driver to 127.0.0.1 for macOS compatibility.

Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
Add the Spark 4 structured streaming runner implementation and tests.
Most files are adapted from the Spark 3 structured streaming runner
with targeted changes for Spark 4 / Scala 2.13 API compatibility.

Key Spark 4-specific changes (diff against runners/spark/3/src/):

EncoderFactory — Replaced the direct ExpressionEncoder constructor
  (removed in Spark 4) with BeamAgnosticEncoder, a named class
  implementing both AgnosticExpressionPathEncoder (for expression
  delegation via toCatalyst/fromCatalyst) and AgnosticEncoders
  .StructEncoder (so Dataset.select(TypedColumn) creates an N-attribute
  plan, preventing FIELD_NUMBER_MISMATCH). The toCatalyst/fromCatalyst
  methods substitute the provided input expression via transformUp,
  enabling correct nesting inside composite encoders like
  Encoders.tuple().

EncoderHelpers — Added toExpressionEncoder() helper to handle Spark 4
  built-in encoders that are AgnosticEncoder subclasses rather than
  ExpressionEncoder.

GroupByKeyTranslatorBatch — Migrated from internal catalyst Expression
  API (CreateNamedStruct, Literal$) to public Column API (struct(),
  lit(), array()), as required by Spark 4.

BoundedDatasetFactory — Use classic.Dataset$.MODULE$.ofRows() as
  Dataset moved to org.apache.spark.sql.classic in Spark 4.

ScalaInterop — Replace WrappedArray.ofRef (removed in Scala 2.13)
  with JavaConverters.asScalaBuffer().toList() in seqOf().

GroupByKeyHelpers, CombinePerKeyTranslatorBatch — Replace
  TraversableOnce with IterableOnce (Scala 2.13 rename).

SparkStructuredStreamingPipelineResult — Replace sparkproject.guava
  with Beam's vendored Guava.

Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
Add GitHub Actions workflows for the Spark 4 runner module:

- beam_PreCommit_Java_Spark4_Versions: runs sparkVersionsTest on
  changes to runners/spark/**.  Currently a no-op (the sparkVersions
  map is empty) but scaffolds future patch version coverage.
- beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming: runs
  the structured streaming test suite on Java 17.

Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
Remove endOfData() call in close method.
Add job-server and container build configurations for Spark 4,
mirroring the existing Spark 3 job-server setup. The container
uses eclipse-temurin:17 (Spark 4 requires Java 17). The shared
spark_job_server.gradle gains a requireJavaVersion conditional
for Spark 4 parent projects.

Co-Authored-By: Claude Opus 4.6 <[email protected]>
The hostname binding hack is no longer needed now that the local
machine resolves its hostname to 127.0.0.1 via /etc/hosts.

Co-Authored-By: Claude Opus 4.6 <[email protected]>
Called out in /ultrareview as a missing contributor checklist item.
Adds a Highlight line and a New Features / Improvements entry under
the 2.74.0 Unreleased section, referencing issue apache#36841.
Per /ultrareview feedback: the one-line comment didn't make clear why
the cast is safe. Expand it to note that SparkSession.builder() always
returns a classic.SparkSession at runtime, which is why the downcast
avoids reflection.
tkaymak added 2 commits April 17, 2026 16:01
Per /ultrareview feedback: the fallback branch silently swallowed the
second ClassNotFoundException. In practice one of the two classes is
always present (Scala 2.12 vs 2.13 stdlib), but a silent skip could
mask a broken classpath. Emit a LOG.warn instead.
Per /ultrareview feedback: the five `"$spark_version" >= "3.5.0"` checks
were lexicographic string comparisons. They happened to work for 3.5.0
and 4.0.2 only because '4' > '3' as chars — a future "3.10.0" release
would compare less than "3.5.0" and silently drop the Spark 3.5+
dependencies and exclusions.

Introduce an `isSparkAtLeast` closure that tokenizes on `.` and `-`,
keeps numeric parts, and compares component-by-component. Replace all
five call sites.
@Abacn
Copy link
Copy Markdown
Contributor

Abacn commented Apr 17, 2026

Thanks, sorry for late response. Since this is a large change here are some generic comments

  • I would like to take this opportunity to make Spark runner source file can be shared among versions, like what we've been doing for Flink runner:

def copySourceOverrides = tasks.register('copySourceOverrides', Copy) { copyTask ->

and other source override logics in the build.gradle

Basically, the lowest version (3) is built on top of runners/spark/src/, the higher version (4) combines runners/spark/src/ and runners/spark/src/4 into a "build/source-override" with the latter overrides the former. By doing this can make the diff much smaller and easier to maintain (generic fixes only needs to change one version

We can copy-paste the logic from runners/flink/flink_runner.gradle#L73

After infra work is done, we only need to copy the sources need changes to runners/spark/3/src, and for the review consider generate a diff between runners/spark/3/src and same files in runners/spark/src, like what we did for Flink 2 support:

https://gist.github.com/Abacn/693c181134f839f04c4c97b42ecd2405

It's fine to have single or a few commits. 5+ commits won't further help. Alternatively we can split the support into multiple PRs, first check in spark-runner.gradle change and classic Spark runner, set applyJavaNature(publish=False) while work in progress. Then complete the support, and then job-server support

tkaymak added a commit to tkaymak/beam that referenced this pull request Apr 17, 2026
…pt Flink-style version overrides

Move runners/spark/3/src/.../structuredstreaming/ (the only sources Spark 3
shipped) into the shared runners/spark/src/, and replace the existing
copySourceBase block in runners/spark/spark_runner.gradle with the
per-version source-overrides layout used by runners/flink/flink_runner.gradle:
the lowest spark_major builds straight from the shared base; higher majors
get a Copy task with DuplicatesStrategy.INCLUDE that merges shared +
previous majors + ./src so per-version files override.

Pure refactor; Spark 3's compiled output is unchanged. Prepares the tree
for the Spark 4 runner (apache#36841 / apache#38212), which lands as a small
overrides layer on top.
@tkaymak
Copy link
Copy Markdown
Contributor Author

tkaymak commented Apr 17, 2026

Thank you for taking the time and guidance @Abacn!
Ttaking a swing at the source-override restructuring you suggested. I have split into two stacked branches on my fork:

PR 1 (refactor only, no Spark 4): #38233

  • Hoists runners/spark/3/src/.../structuredstreaming/ to shared runners/spark/src/ (Spark 3's only sources, so runners/spark/3/src/ is removed entirely).
  • Replaces the existing copySourceBase block in spark_runner.gradle with the same Copy + DuplicatesStrategy.INCLUDE layering used by runners/flink/flink_runner.gradle.
  • Spark 3 build behavior unchanged.

PR 2 (Spark 4, on top of PR 1): branch spark4-runner-slim.

  • Same 14 commits as this PR, plus a final cleanup that drops byte-identical structured-streaming files from runners/spark/4/src/. Only the 11 files that genuinely differ for Spark 4 / Scala 2.13 remain as overrides.

How would you like to proceed? Options I see:

  1. Land PR 1 first, then I close this PR and open a fresh PR 2 against master once 1 is merged.
  2. Keep this PR open and rebase it onto PR 1's branch once 1 merges (turns this PR into the slim version automatically).

Happy to switch the head branch of this PR to spark4-runner-slim immediately if that's easier — but didn't want to force-push without your sign-off given existing review threads here.

tkaymak added a commit to tkaymak/beam that referenced this pull request Apr 17, 2026
…pt Flink-style version overrides

Move runners/spark/3/src/.../structuredstreaming/ (the only sources Spark 3
shipped) into the shared runners/spark/src/, and replace the existing
copySourceBase block in runners/spark/spark_runner.gradle with the
per-version source-overrides layout used by runners/flink/flink_runner.gradle:
the lowest spark_major builds straight from the shared base; higher majors
get a Copy task with DuplicatesStrategy.INCLUDE that merges shared +
previous majors + ./src so per-version files override.

Pure refactor; Spark 3's compiled output is unchanged. Prepares the tree
for the Spark 4 runner (apache#36841 / apache#38212), which lands as a small
overrides layer on top.
tkaymak added a commit to tkaymak/beam that referenced this pull request Apr 17, 2026
…pt Flink-style version overrides

Move runners/spark/3/src/.../structuredstreaming/ (the only sources Spark 3
shipped) into the shared runners/spark/src/, and replace the existing
copySourceBase block in runners/spark/spark_runner.gradle with the
per-version source-overrides layout used by runners/flink/flink_runner.gradle:
the lowest spark_major builds straight from the shared base; higher majors
get a Copy task with DuplicatesStrategy.INCLUDE that merges shared +
previous majors + ./src so per-version files override.

Pure refactor; Spark 3's compiled output is unchanged. Prepares the tree
for the Spark 4 runner (apache#36841 / apache#38212), which lands as a small
overrides layer on top.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants